Skip to content

Integrate Zenoh#2362

Open
paul-nechifor wants to merge 1 commit into
mainfrom
paul/feat-integrate-zenoh
Open

Integrate Zenoh#2362
paul-nechifor wants to merge 1 commit into
mainfrom
paul/feat-integrate-zenoh

Conversation

@paul-nechifor

@paul-nechifor paul-nechifor commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Problem

We need to support Zenoh as well.

Closes DIM-955

Solution

How to Test

Run a blueprint with Zenoh communication:

uv run dimos --transport=zenoh --simulation run unitree-go2-agentic

Start humancli, also with Zenoh:

uv run humancli --transport=zenoh

Contributor License Agreement

  • I have read and approved the CLA.

@paul-nechifor paul-nechifor marked this pull request as draft June 5, 2026 02:33
@codecov

codecov Bot commented Jun 5, 2026

Copy link
Copy Markdown

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
2192 1 2191 35
View the top 1 failed test(s) by shortest run time
dimos.protocol.pubsub.impl.test_zenohpubsub.TestZenohPubSubBase::test_untyped_topic_with_dotted_segment_round_trips
Stack Traces | 0.505s run time
request = <SubRequest 'monitor_threads' for <Function test_untyped_topic_with_dotted_segment_round_trips>>

    @pytest.fixture(autouse=True)
    def monitor_threads(request):
        # Capture threads before test runs
        test_name = request.node.nodeid
        with _seen_threads_lock:
            _before_test_threads[test_name] = {
                t.ident for t in threading.enumerate() if t.ident is not None
            }
    
        yield
    
        with _seen_threads_lock:
            before = _before_test_threads.get(test_name, set())
            current = {t.ident for t in threading.enumerate() if t.ident is not None}
    
            # New threads are ones that exist now but didn't exist before this test
            new_thread_ids = current - before
    
            if not new_thread_ids:
                return
    
            # Get the actual thread objects for new threads
            new_threads = [
                t for t in threading.enumerate() if t.ident in new_thread_ids and t.name != "MainThread"
            ]
    
            # Filter out expected persistent threads that are shared globally
            # These threads are intentionally left running and cleaned up on process exit
            expected_persistent_thread_prefixes = [
                "Dask-Offload",
                # HuggingFace safetensors conversion thread - no user cleanup API
                # https://github..../transformers/issues/29513
                "Thread-auto_conversion",
            ]
            new_threads = [
                t
                for t in new_threads
                if not any(t.name.startswith(prefix) for prefix in expected_persistent_thread_prefixes)
            ]
    
            # Filter out threads we've already seen (from previous tests)
            truly_new = [t for t in new_threads if t.ident not in _seen_threads]
    
            # Mark all new threads as seen
            for t in new_threads:
                if t.ident is not None:
                    _seen_threads.add(t.ident)
    
            if not truly_new:
                return
    
            thread_names = [t.name for t in truly_new]
    
>           pytest.fail(
                f"Non-closed threads created during this test. Thread names: {thread_names}. "
                "Please look at the first test that fails and fix that."
            )
E           Failed: Non-closed threads created during this test. Thread names: ['Thread-2660 (pyo3-closure)']. Please look at the first test that fails and fix that.

before     = {140175489361600, 140185641771136}
current    = {140175489361600, 140176008935104, 140185641771136}
expected_persistent_thread_prefixes = ['Dask-Offload', 'Thread-auto_conversion']
new_thread_ids = {140176008935104}
new_threads = [<Thread(Thread-2660 (pyo3-closure), stopped 140176008935104)>]
request    = <SubRequest 'monitor_threads' for <Function test_untyped_topic_with_dotted_segment_round_trips>>
t          = <Thread(Thread-2660 (pyo3-closure), stopped 140176008935104)>
test_name  = '.../pubsub/impl/test_zenohpubsub.py::TestZenohPubSubBase::test_untyped_topic_with_dotted_segment_round_trips'
thread_names = ['Thread-2660 (pyo3-closure)']
truly_new  = [<Thread(Thread-2660 (pyo3-closure), stopped 140176008935104)>]

dimos/conftest.py:269: Failed

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch from b66e833 to f8d2d42 Compare June 5, 2026 02:34
@greptile-apps

greptile-apps Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR integrates Zenoh as a second transport backend alongside LCM, making it the platform default on macOS. The implementation adds ZenohPubSubBase/Zenoh/PickleZenoh pub/sub layers, a ZenohSessionPool for shared session management, ZenohRPC and ZenohTF for RPC and transform frames, and a new transport_factory.py that serves as the single backend-agnostic construction point used across all CLI tools, modules, and workers.

  • DIMOS_TRANSPORT env var or --transport CLI flag switches the backend globally; worker processes sync via python_worker.py before module instantiation; GlobalConfig carries zenoh_qos rules for per-publisher QoS tuning.
  • module.py now uses Field(default_factory=rpc_backend) / Field(default_factory=tf_backend) so ModuleConfig picks the right RPC/TF class at runtime, and rpc.start() is reordered to precede serve_module_rpc() because Zenoh needs an open session before subscribing.
  • All CLI tools (humancli, agentspy, dtop, dimos topic) are ported from hard-coded pLCMTransport to make_transport, and the resource-monitor topic was renamed from /dimos/resource_stats/resource_stats consistently across publisher and subscriber.

Confidence Score: 4/5

Safe to merge with the caveat that the DDSTransport race between broadcast/stop (flagged in a previous review thread) remains unresolved in this diff.

The Zenoh integration is well-structured and the new code does not introduce equivalent gaps — ZenohTransport uses _start_lock properly and the session pool never closes mid-publish. The unresolved DDSTransport broadcast/stop race from a prior thread is the primary concern.

dimos/core/transport.py (DDSTransport.broadcast/subscribe still unguarded by _start_lock)

Important Files Changed

Filename Overview
dimos/core/transport_factory.py New backend-agnostic factory: transport_topic, make_transport, apply_transport_arg, rpc_backend, tf_backend — cleanly delegates to global_config.transport.
dimos/core/transport.py Adds ZenohTransport and pZenohTransport; DDSTransport still missing _start_lock around broadcast/subscribe (pre-existing gap noted in review threads).
dimos/protocol/pubsub/impl/zenohpubsub.py ZenohPubSubBase with per-publisher cache, wildcard subscribe_all drain thread, and clean stop() with proper lock ordering — well structured.
dimos/protocol/service/zenohservice.py ZenohSessionPool shares sessions keyed by mode/connect/listen config; exposes close_all() for explicit teardown.
dimos/core/global_config.py Adds transport (AliasChoices DIMOS_TRANSPORT/transport) and zenoh_qos fields; platform-aware default (Darwin→zenoh, Linux→lcm).
dimos/core/coordination/module_coordinator.py _get_transport_for now calls make_transport; _coerce_transport_to_backend rebuilds LCM↔Zenoh transports from transport_map; lcm_configurators skipped on Zenoh backend.
dimos/core/coordination/python_worker.py Worker process syncs transport and zenoh_qos from host config (via kwargs["g"]) before module instantiation — correct ordering.
dimos/visualization/rerun/bridge.py _resolve_pubsubs treats [LCM()] as legacy default even when explicitly set — sentinel concern noted in previous thread; _default_pubsubs correctly reads config.g.transport.
dimos/protocol/rpc/pubsubrpc.py ZenohRPC added: mirrors LCMRPC pattern (explicit dual init + PubSubRPCMixin.super chain), topicgen namespaces under dimos/rpc/.
dimos/core/module.py rpc_transport/tf_transport now use Field(default_factory=rpc_backend/tf_backend); rpc.start() reordered before serve_module_rpc for Zenoh session requirement.
dimos/robot/cli/dimos.py global_config.update moved to main callback so all subcommands honor --transport; GenericAlias fields skipped from CLI; show_config simplified.
dimos/protocol/pubsub/impl/zenohqos.py Lightweight QoS rule model with sensible defaults: RPC/human/agent channels reliable+block; high-rate sensor streams best_effort+drop.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Process Start] --> B{DIMOS_TRANSPORT or --transport flag?}
    B -- set --> C[apply_transport_arg / global_config.update]
    B -- not set --> D{platform.system?}
    D -- Darwin --> E[transport = zenoh]
    D -- Linux --> F[transport = lcm]
    C --> G[global_config.transport resolved]
    E --> G
    F --> G
    G --> H[make_transport name msg_type]
    H --> I{transport == zenoh?}
    I -- yes typed --> J[ZenohTransport dimos/name/pkg.Msg]
    I -- yes pickled --> K[pZenohTransport dimos/name]
    I -- no typed --> L[LCMTransport /name]
    I -- no pickled --> M[pLCMTransport /name]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A[Process Start] --> B{DIMOS_TRANSPORT or --transport flag?}
    B -- set --> C[apply_transport_arg / global_config.update]
    B -- not set --> D{platform.system?}
    D -- Darwin --> E[transport = zenoh]
    D -- Linux --> F[transport = lcm]
    C --> G[global_config.transport resolved]
    E --> G
    F --> G
    G --> H[make_transport name msg_type]
    H --> I{transport == zenoh?}
    I -- yes typed --> J[ZenohTransport dimos/name/pkg.Msg]
    I -- yes pickled --> K[pZenohTransport dimos/name]
    I -- no typed --> L[LCMTransport /name]
    I -- no pickled --> M[pLCMTransport /name]
Loading

Reviews (5): Last reviewed commit: "integrate zenoh" | Re-trigger Greptile

Comment thread dimos/protocol/service/zenohservice.py Outdated
Comment thread dimos/protocol/pubsub/impl/zenohpubsub.py Outdated
Comment thread dimos/visualization/rerun/bridge.py
Comment thread pyproject.toml
@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch 6 times, most recently from 0afc3a9 to 4472fc9 Compare June 9, 2026 00:56
def make_transport(
name: str, msg_type: type | None = None, *, g: GlobalConfig = global_config
) -> PubSubTransport[Any]:
"""Construct the active-backend pub/sub transport for a logical channel.

@leshy leshy Jun 9, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few things, Transport isn't neccessarily a PubSubTransport - we can use TCP and IP address as a setting here etc (in theory, haven't implemented)

in case of PubSubTransport, a string doesn't define a full topic, there is a reason why Topic for LCM and Zenoh is a different object. Zenoh offers QoS settings etc per channel. maybe specific router config etc.

So I'm thinking when global switch zenoh or LCM is used, for lcm that can literally be just LCM(topic_string) but zenoh probably wants reliable delivery for RPC specifically or specific per topic configuration (Image can be unreliable, but not agent messages)

I'm not sure right now what to suggest here - if we can normalize transport requirements across topics "this is reliable", "this is unreliable" or if we need per transport global blueprint config overlay that this global config switch just applies? global overlay seems better to me

return Topic(topic=topic)


class ZenohRPC(PubSubRPCMixin[Topic, Any], PickleZenoh):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this initially, but Zenoh actually supports RPC on their protocol level, so we dont need to piggyback to pubsub here.

@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch 2 times, most recently from eab7ba7 to ee8cb1f Compare June 10, 2026 05:51
@paul-nechifor paul-nechifor changed the title WIP: integrate zenoh Integrate Zenoh Jun 10, 2026
@paul-nechifor paul-nechifor marked this pull request as ready for review June 10, 2026 11:57
Comment thread dimos/core/transport.py
@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch from 96fead3 to 719fd0b Compare June 12, 2026 20:37
@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch 3 times, most recently from 4866c9c to 925fb38 Compare June 20, 2026 23:27
@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch from 39e3e4c to 32849cf Compare June 20, 2026 23:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants